#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Override this section from the script to include the com.sun.management.jmxremote.rmi.port property.
if [ -z "${KAFKA_JMX_OPTS-}" ]; then
    export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true \
    -Dcom.sun.management.jmxremote.authenticate=false \
    -Dcom.sun.management.jmxremote.ssl=false "
fi

# The JMX client needs to be able to connect to java.rmi.server.hostname.
# The default for bridged n/w is the bridged IP so you will only be able to connect from another docker container.
# For host n/w, this is the IP that the hostname on the host resolves to.

# If you have more than one n/w configured, hostname -i gives you all the IPs,
# the default is to pick the first IP (or network).
export KAFKA_JMX_HOSTNAME=${KAFKA_JMX_HOSTNAME:-$(hostname -i | cut -d" " -f1)}

if [ "${KAFKA_JMX_PORT-}" ]; then
  # This ensures that the "if" section for JMX_PORT in kafka launch script does not trigger.
    export JMX_PORT=$KAFKA_JMX_PORT
    export KAFKA_JMX_OPTS="${KAFKA_JMX_OPTS-} -Djava.rmi.server.hostname=$KAFKA_JMX_HOSTNAME \
    -Dcom.sun.management.jmxremote.local.only=false \
    -Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT \
    -Dcom.sun.management.jmxremote.port=$JMX_PORT"
fi

# Make a temp env variable to store user provided performance otps
if [ -z "${KAFKA_JVM_PERFORMANCE_OPTS-}" ]; then
    export TEMP_KAFKA_JVM_PERFORMANCE_OPTS=""
else
    export TEMP_KAFKA_JVM_PERFORMANCE_OPTS="$KAFKA_JVM_PERFORMANCE_OPTS"
fi

# We will first use CDS for storage to format storage
export KAFKA_JVM_PERFORMANCE_OPTS="${KAFKA_JVM_PERFORMANCE_OPTS-} -XX:SharedArchiveFile=/opt/kafka/storage.jsa"

echo "===> Using provided cluster id $CLUSTER_ID ..."

# Invoke the docker wrapper to setup property files and format storage
result=$(/opt/kafka/bin/kafka-run-class.sh kafka.docker.KafkaDockerWrapper setup \
      --default-configs-dir /etc/kafka/docker \
      --mounted-configs-dir /mnt/shared/config \
      --final-configs-dir /opt/kafka/config 2>&1) || \
      echo $result | grep -i "already formatted" || \
      { echo $result && (exit 1) }

# Using temp env variable to get rid of storage CDS command
export KAFKA_JVM_PERFORMANCE_OPTS="$TEMP_KAFKA_JVM_PERFORMANCE_OPTS"

# Now we will use CDS for kafka to start kafka server
export KAFKA_JVM_PERFORMANCE_OPTS="$KAFKA_JVM_PERFORMANCE_OPTS -XX:SharedArchiveFile=/opt/kafka/kafka.jsa"

# Start kafka broker
exec /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
